Aggregation — Typed and Untyped Grouping

You can group records in a Dataset using a condition to compute aggregates (over a collection of grouped records).

You can use agg method for computing aggregations per column on the entire data set (without first creating groups and considering the entire data set as one group).

scala> spark.range(10).agg(sum('id) as "sum").show
+---+
|sum|
+---+
| 45|
+---+

The following aggregate operators are available:

  1. groupBy for untyped aggregations with Column- or String-based column names.

  2. groupByKey for strongly-typed aggregations where the data is grouped by a given key function.

  3. rollup

  4. cube

The untyped aggregations, e.g. groupBy, rollup, and cube, return RelationalGroupedDatasets while groupByKey returns a KeyValueGroupedDataset.

Aggregates on Entire Dataset (Without Groups) — agg Operator

agg(expr: Column, exprs: Column*): DataFrame
agg(exprs: Map[String, String]): DataFrame
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame

agg computes aggregate expressions on all the records in a Dataset.

Note
agg is simply a shortcut for groupBy().agg(…​).

Grouping by Columns — groupBy Untyped Operators

groupBy(cols: Column*): RelationalGroupedDataset
groupBy(col1: String, cols: String*): RelationalGroupedDataset

groupBy methods group the Dataset using the specified columns as Columns or their text representation. It returns a RelationalGroupedDataset to apply aggregation to.

// 10^3-record large data set
val ints = 1 to math.pow(10, 3).toInt

scala> val dataset = ints.toDF("n").withColumn("m", 'n % 2)
dataset: org.apache.spark.sql.DataFrame = [n: int, m: int]

scala> dataset.count
res0: Long = 1000

scala> dataset.groupBy('m).agg(sum('n)).show
+---+------+
|  m|sum(n)|
+---+------+
|  1|250000|
|  0|250500|
+---+------+

Internally, it first resolves columns and then builds a RelationalGroupedDataset.

Note
The following session uses the data setup as described in Test Setup section below.
scala> dataset.show
+----+---------+-----+
|name|productId|score|
+----+---------+-----+
| aaa|      100| 0.12|
| aaa|      200| 0.29|
| bbb|      200| 0.53|
| bbb|      300| 0.42|
+----+---------+-----+

scala> dataset.groupBy('name).avg().show
+----+--------------+----------+
|name|avg(productId)|avg(score)|
+----+--------------+----------+
| aaa|         150.0|     0.205|
| bbb|         250.0|     0.475|
+----+--------------+----------+

scala> dataset.groupBy('name, 'productId).agg(Map("score" -> "avg")).show
+----+---------+----------+
|name|productId|avg(score)|
+----+---------+----------+
| aaa|      200|      0.29|
| bbb|      200|      0.53|
| bbb|      300|      0.42|
| aaa|      100|      0.12|
+----+---------+----------+

scala> dataset.groupBy('name).count.show
+----+-----+
|name|count|
+----+-----+
| aaa|    2|
| bbb|    2|
+----+-----+

scala> dataset.groupBy('name).max("score").show
+----+----------+
|name|max(score)|
+----+----------+
| aaa|      0.29|
| bbb|      0.53|
+----+----------+

scala> dataset.groupBy('name).sum("score").show
+----+----------+
|name|sum(score)|
+----+----------+
| aaa|      0.41|
| bbb|      0.95|
+----+----------+

scala> dataset.groupBy('productId).sum("score").show
+---------+------------------+
|productId|        sum(score)|
+---------+------------------+
|      300|              0.42|
|      100|              0.12|
|      200|0.8200000000000001|
+---------+------------------+

groupByKey Typed Operator

groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]

groupByKey groups records (of type T) by the input func. It returns a KeyValueGroupedDataset to apply aggregation to.

Note
groupByKey is Dataset's experimental API.
scala> dataset.groupByKey(_.productId).count.show
+-----+--------+
|value|count(1)|
+-----+--------+
|  300|       1|
|  100|       1|
|  200|       2|
+-----+--------+

import org.apache.spark.sql.expressions.scalalang._
scala> dataset.groupByKey(_.productId).agg(typed.sum[Token](_.score)).toDF("productId", "sum").orderBy('productId).show
+---------+------------------+
|productId|               sum|
+---------+------------------+
|      100|              0.12|
|      200|0.8200000000000001|
|      300|              0.42|
+---------+------------------+

RelationalGroupedDataset

RelationalGroupedDataset is a result of executing the untyped operators groupBy, rollup and cube.

RelationalGroupedDataset is also a result of executing pivot operator on a grouped records as RelationalGroupedDataset.

It offers the following operators to work on a grouped collection of records:

  • agg

  • count

  • mean

  • max

  • avg

  • min

  • sum

  • pivot

KeyValueGroupedDataset

KeyValueGroupedDataset is an experimental interface to a result of executing the strongly-typed operator groupByKey.

scala> val tokensByName = dataset.groupByKey(_.name)
tokensByName: org.apache.spark.sql.KeyValueGroupedDataset[String,Token] = org.apache.spark.sql.KeyValueGroupedDataset@1e3aad46

It holds keys that were used for the object.

scala> tokensByName.keys.show
+-----+
|value|
+-----+
|  aaa|
|  bbb|
+-----+

The following methods are available for any KeyValueGroupedDataset to work on groups of records:

  1. agg (of 1 to 4 types)

  2. mapGroups

  3. flatMapGroups

  4. reduceGroups

  5. count that is a special case of agg with count function applied.

  6. cogroup

Test Setup

This is a setup for learning GroupedData. Paste it into Spark Shell using :paste.

import spark.implicits._

case class Token(name: String, productId: Int, score: Double)
val data = Token("aaa", 100, 0.12) ::
  Token("aaa", 200, 0.29) ::
  Token("bbb", 200, 0.53) ::
  Token("bbb", 300, 0.42) :: Nil
val dataset = data.toDS.cache  (1)
  1. Cache the dataset so the following queries won’t load/recompute data over and over again.

results matching ""

    No results matching ""